package elitequant.event;

import java.math.BigDecimal;
import java.util.EnumSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.sun.jna.NativeLibrary;

import elitequant.data.TickEvent;
import elitequant.order.FillEvent;
import elitequant.order.OrderStatus;
import elitequant.order.OrderStatusEvent;
import elitequant.util.UtilFunc;
import nanomsg.Nanomsg.SocketFlag;
import nanomsg.Socket;
import nanomsg.pair.PairSocket;
import nanomsg.pubsub.SubSocket;

public class ClientMq{
    
    static {
        String path = ClientMq.class.getResource("/elitequant/server").getPath();
        
        path = path.replaceFirst("/", "");
        path = path.replaceAll("%20", " ");
        
        System.out.println("load nanomsg lib from:" + path);
        
        NativeLibrary.addSearchPath("nanomsg", path);
    }
    
    private EventsEngine eventsEngine;

    private boolean active;

    private LinkedBlockingQueue<String> outgoingQuue;

    private Thread recvTickThread;
    private Thread recvMsgThread;
    private Thread sendMsgThread;

    private Socket tickSock;
    
    private Socket msgSock;

    public ClientMq(EventsEngine eventsEngine) {
        this.eventsEngine = eventsEngine;

        outgoingQuue = new LinkedBlockingQueue<String>();
        
        recvTickThread = new RecvTickThread();
        recvMsgThread = new RecvMsgThread();
        sendMsgThread = new SendMsgThread();
        
        tickSock = new SubSocket();
        msgSock = new PairSocket();
    }

    private class RecvTickThread extends Thread{
    
        @Override
        public void run() {
            while (active) {
                try {
                    // default utf-8 encoding
                    String msg = tickSock.recvString(EnumSet.of(SocketFlag.NN_DONTWAIT)); 
                    if (msg != null && msg.indexOf("|") > 0) {
                        
                        if(msg.endsWith("\0")) {
                            msg = msg.substring(0, msg.length() - 1);
                        }
                        
                        String[] v = msg.split("\\|");
                        TickEvent k = new TickEvent();
                        k.fullSymbol = v[0].trim();
                        k.price = new BigDecimal(v[3].trim());
                        eventsEngine.put(k);
                    }
                    else {
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);   
                    } catch (Exception e2) {
                        // TODO: handle exception
                    }
//                    System.out.println(String.format("'SUB error:", e.getMessage()));
                }
            }
        }
        
    }
    
    private class RecvMsgThread extends Thread{

        @Override
        public void run() {
            while (active) {
                
                try {
                    // default utf-8 encoding
                    String msg = msgSock.recvString(EnumSet.of(SocketFlag.NN_DONTWAIT)); 
                    if (msg != null && msg.indexOf("|") > 0) {
                        
                        if(msg.endsWith("\0")) {
                            msg = msg.substring(0, msg.length() - 1);
                        }
                        
                        if(msg.endsWith("\\x00")) {
                            msg = msg.substring(0, msg.length() - 1);
                        }

                        GeneralEvent k = new GeneralEvent();
                        k.content = msg;
                        eventsEngine.put(k);
                        
                        String[] v = msg.split("\\|");
                        if ("s".equals(v[0].trim())) {
                            OrderStatusEvent m = new OrderStatusEvent();
                            m.brokerOrderId = Integer.parseInt(v[1].trim());
                            m.internalOrderId = m.brokerOrderId;
                            m.orderStatus = OrderStatus.values()[Integer
                                    .parseInt(v[2].trim())];
                            eventsEngine.put(m);
                        } else if ("f".equals(v[0])) {
                            FillEvent f = new FillEvent();
                            f.brokerOrderId = Integer.parseInt(v[1].trim());
                            f.internalOrderId = f.brokerOrderId;
                            f.timestamp = UtilFunc.parseDate(v[2].trim());
                            f.fillPrice = new BigDecimal(v[3].trim());
                            f.fillSize = Integer.parseInt(v[4].trim());
                            eventsEngine.put(f);
                        }
                        
                    }
                    else {
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    try {
                        Thread.sleep(1000);   
                    } catch (Exception e2) {
                        // TODO: handle exception
                    }
//                    System.out.println(String.format("'PAIR in error:", e.getMessage()));
                }
            }
        }
        
    }
    
    private class SendMsgThread extends Thread{

        @Override
        public void run() {
            while (active) {
                
                try {
                    String msg = outgoingQuue.poll(300, TimeUnit.MILLISECONDS);
                    if(msg != null) {
                        System.out.println(msg);
                        msgSock.send(msg, EnumSet.of(SocketFlag.NN_DONTWAIT));   
                    }
                }catch(Exception e) {
                    System.out.println(String.format("'PAIR out error:", e.getMessage()));
                }
            }
        }
        
    }
    /**
     * start the mq thread
     */
    public void start() {

        tickSock.connect("tcp://127.0.0.1:55555");
        msgSock.connect("tcp://127.0.0.1:55558");
        
        active = true;
        
        if (!recvTickThread.isAlive()) {
            recvTickThread.start();
        }
        if (!recvMsgThread.isAlive()) {
            recvMsgThread.start();
        }
        if (!sendMsgThread.isAlive()) {
            sendMsgThread.start();
        }
    }

    /**
     * stop the mq thread
     */
    public void stop() {
        tickSock.close();
        msgSock.close();

        active = false;
        try {
            recvTickThread.join();
            recvMsgThread.join();
            sendMsgThread.join();
        } catch (Exception e) {
            // ignore
        }
    }

    /**
     * send message by MQ
     * 
     * @param msg
     */
    public void send(String msg) {
        outgoingQuue.add(msg);
    }
    
    public void subscribe(String[] symbols) {
        for(String symbol : symbols) {
            tickSock.subscribe(symbol);   
        }
    }
    
    public void subscribe(String symbol) {
        tickSock.subscribe(symbol);
    }

 

}
